package rabbitmq

import (
	"encoding/json"
	"fmt"
	"github.com/streadway/amqp"
	"snow-im/app/utils"
	"snow-im/config"
	"strconv"
)

var (
	mqUserName, mqPassword, mqUrl string
)

type RabbitMq struct {
}

func InitRabbitMq() {
	mqUrl = config.GetConf().RabbitMq.Url
	mqUserName = config.GetConf().RabbitMq.User
	mqPassword = config.GetConf().RabbitMq.Password
	fmt.Println(mqUrl, mqUserName, mqPassword)
}

func failOnError(err error, msg string) {
	if err != nil {
		utils.Log(nil, msg, err.Error())
	}
}

func (this RabbitMq) Consume(name string, hand interface{}) {
	defer func() {
		if err := recover(); err != nil {
			utils.Log(nil, "consume err", err)
		}
	}()
	conn, err := amqp.Dial("amqp://" + mqUserName + ":" + mqPassword + "@" + mqUrl + ":5672")
	//conn, err := amqp.Dial("amqp://" + mqUserName + ":" + mqPassword + "@" + mqUrl + ":5672")

	failOnError(err, "Failed to connect to RabbitMQ55")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	q, err := ch.QueueDeclare(
		name,  // name
		true,  // durable
		false, // delete when unused
		false, // exclusive
		false, // no-wait
		nil,   // arguments
	)
	failOnError(err, "Failed to declare a queue")
	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		false,  // auto-ack
		false,  // exclusive
		false,  // no-loc al
		false,  // no-wait
		nil,    // args
	)

	//fmt.Println("gggg",len(msgs))
	failOnError(err, "Failed to register a consumer")
	forever := make(chan bool)
	for d := range msgs {

		hand.(func(tag uint64, ch *amqp.Channel, msg []byte))(d.DeliveryTag, ch, d.Body)
	}

	<-forever
}
func (this RabbitMq) Produce(name string, log interface{}, delayTime int, args ...interface{}) error {
	name = config.GetConf().AppName + "_" + name
	conn, err := amqp.Dial("amqp://" + mqUserName + ":" + mqPassword + "@" + mqUrl + ":5672")
	failOnError(err, "Failed to connect to RabbitMQ11")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()
	body, err := json.Marshal(log)
	if err == nil {

	}
	var q amqp.Queue
	if delayTime > 0 {
		q, err = ch.QueueDeclare(
			name,  // name
			true,  // durable
			false, // delete when unused
			false, // exclusive
			false, // no-wait
			amqp.Table{
				// 当消息过期时把消息发送到 logs 这个 exchange
				"x-dead-letter-exchange":    args[0].(string),
				"x-dead-letter-routing-key": "dely",
			}, // arguments
		)

	} else {
		q, err = ch.QueueDeclare(
			name,  // name
			true,  // durable
			false, // delete when unused
			false, // exclusive
			false, // no-wait
			nil,   // arguments
		)
	}

	failOnError(err, "Failed to declare a queue")

	//failOnError(err, "Failed to declare a queue")
	if delayTime > 0 {
		err = ch.Publish(
			"",     // exchange
			q.Name, // routing key
			false,  // mandatory
			false,  // immediate
			amqp.Publishing{
				ContentType: "text/plain",
				Body:        body,
				Expiration:  strconv.Itoa(delayTime * 1000),
			})
	} else {
		err = ch.Publish(
			"",     // exchange
			q.Name, // routing key
			false,  // mandatory
			false,  // immediate
			amqp.Publishing{
				ContentType: "text/plain",
				Body:        body,
			})
	}

	failOnError(err, "Failed to publish a message")
	return err
}
func DelayConsume(name string, to string, hand func(tag uint64, ch *amqp.Channel, msg []byte)) {
	defer func() {
		if err := recover(); err != nil {
			utils.Log(nil, "consume err", err)
		}
	}()
	conn, err := amqp.Dial("amqp://" + mqUserName + ":" + mqPassword + "@" + mqUrl + ":5672")

	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()
	//err = ch.Qos(1,0,true)
	fmt.Println(err)
	// 声明一个主要使用的 exchange
	err = ch.ExchangeDeclare(
		to,       // name
		"direct", // type
		true,     // durable
		false,    // auto-deleted
		false,    // internal
		false,    // no-wait
		nil,      // arguments
	)
	//延时队列
	_, err = ch.QueueDeclare(
		"delay_"+name, // name
		true,          // durable
		false,         // delete when unused
		false,         // exclusive
		false,         // no-wait
		amqp.Table{
			// 当消息过期时把消息发送到 logs 这个 exchange
			"x-dead-letter-exchange":    to,
			"x-dead-letter-routing-key": "dely",
		}, // arguments
	)

	//utils.Log(nil,"delay",err)
	q, err := ch.QueueDeclare(
		"delay_"+to, // name
		true,        // durable
		false,       // delete when unused
		false,       // exclusive
		false,       // no-wait
		nil,         // arguments
	)
	ch.QueueBind(q.Name, "dely", to, false, nil)

	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		false,  // auto-ack
		false,  // exclusive
		false,  // no-loc al
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "msg")
	forever := make(chan bool)

	for d := range msgs {
		fmt.Println(string(d.Body))
		hand(d.DeliveryTag, ch, d.Body)
	}

	<-forever
}
